In [None]:
# Install to run checks on data in the Unity catalog
pip install -i https://pypi.cloud.soda.io soda-spark-df

In [None]:
# Install to run checks on data in the file stored in Databricks file system
pip install -i https://pypi.cloud.soda.io soda-spark[databricks]

In [None]:
dbutils.library.restartPython()

In [None]:
# Import Scan from Soda Library
from soda.scan import Scan 
import yaml
from io import StringIO
from pathlib import Path
from datetime import datetime, timedelta

# Define file directory
settings_path = Path('/Workspace/Users/user@soda.io/employee_attrition/soda_settings')

# Define results file directory
result_path = Path('/Workspace/Users/user@soda.io/employee_attrition/checks_output')


In [None]:
# Define the file partition
partition = (datetime.today().date() - timedelta(days=1)).strftime("%Y-%m-%d")

In [None]:
# Create a scan object
scan = Scan()

# Set scan name and data source name
scan.set_scan_definition_name("Employee Attrition Scan")
scan.set_data_source_name("employee_info")

In [None]:
# Add file to be scanned 
df = spark.read.option("header", True).csv(f"dbfs:/Workspace/Users/user@soda.io/employee_attrition/soda_settings/login_logout/PartitionDate={partition}")

# Create temporary View to run the checks 
df.createOrReplaceTempView("login_logout")

# Add View to the scan object
scan.add_spark_session(spark, data_source_name="login_logout.py")

In [None]:
# Access the checks YAML file 
with open(settings_path/"ingestion_checks.yml") as ing_checks:
    ingestion = ing_checks.read()

# Create a file-like object from the YAML content
ingestion_checks = StringIO(ingestion)

# Use the scan.add_sodacl_yaml method to retrieve the checks
scan.add_sodacl_yaml_str(ingestion_checks)

In [None]:
# Retrieve the configuration file and use the scan.add_sodacl_yaml method 
with open(settings_path/"soda_conf.yml") as cfg:
    cfg_content = cfg.read()

# Create a file-like object from the YAML content
conf = StringIO(cfg_content)

# Add the data source connection configuration to the scan
scan.add_configuration_yaml_str(conf)

In [None]:
# Execute the scan
scan.execute()

# Check the Scan object for methods to inspect the scan result; print all logs to console
print(scan.get_logs_text())

In [None]:
# Save the checks metadata for further analysis
metadata = scan.build_scan_results()

scan_date = datetime.now().date().strftime("%Y-%m-%d")

scan.save_scan_result_to_file(result_path/f"ingestion_result_{scan_date}.json", metadata['checks'])

In [None]:
checks_data = [
    {
        'column': check.get('column'),
        'dataSource': check.get('dataSource'),
        'outcome': check.get('outcome'),
        'name': check.get('name'),
        'scanEndTimestamp':metadata['scanEndTimestamp'],
    }
    for check in metadata.get('checks', [])
]

# output_data = {
#     'scanEndTimestamp':metadata['scanEndTimestamp'],
#     'checks': checks_data
# }

In [None]:
import json
checks_data_json = json.dumps(checks_data)

In [None]:
scan.save_scan_result_to_file(result_path/f"test.json", checks_data_json)